package com.yeeyk.demo.handler.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import com.yeeyk.demo.common.Constants;
import com.yeeyk.demo.dto.ExcelRom;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CountDownLatch;

@Component
public class MessageTopology {

	public boolean handerShell(List<ExcelRom> data) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        LocalCluster cluster = new LocalCluster();
		try {
            TopologyBuilder builder = new TopologyBuilder();
            //设置sqout的并行度和任务数（产生2个执行器和俩个任务）
            builder.setSpout(Constants.StormConstants.DATA_SPOUT_NAME, new MessageSpout(data , countDownLatch), 2);
            //设置bolt的并行度和任务数:（产生2个执行器和4个任务）
            builder.setBolt(Constants.StormConstants.HANDLER_BOLT_NAME,  new SpliterBolt(), 2)
                    .shuffleGrouping(Constants.StormConstants.DATA_SPOUT_NAME).setNumTasks(4);
            //设置bolt的并行度和任务数:（产生6个执行器和6个任务）
            builder.setBolt(Constants.StormConstants.OUT_BOLT_NAME, new WriterBolt(), 6)
                    .shuffleGrouping(Constants.StormConstants.HANDLER_BOLT_NAME);
            Config config = new Config();
            config.setDebug(false);
            cluster.submitTopology(Constants.StormConstants.TOPO_NAME, config, builder.createTopology());
            countDownLatch.await();
            return true;
        }catch (Exception e){
		    e.printStackTrace();
        }finally {
            cluster.killTopology(Constants.StormConstants.TOPO_NAME);
            cluster.shutdown();
        }
        return false;
    }

}
